package cn.tenmg.flink.jobs.quickstart.servlet;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Optional;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.SerializedThrowable;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import cn.tenmg.flink.jobs.FlinkJobsClient;
import cn.tenmg.flink.jobs.clients.StandaloneRestClusterClient;
import cn.tenmg.flink.jobs.config.loader.XMLConfigLoader;
import cn.tenmg.flink.jobs.config.model.FlinkJobs;
import cn.tenmg.flink.jobs.quickstart.model.Response;
import cn.tenmg.flink.jobs.quickstart.utils.HttpUtils;

/**
 * flink-jobs任务调度请求处理器
 * 
 * @author June wjzhao@aliyun.com
 *
 */
public class FlinkJobsServlet extends HttpServlet {

	/**
	 * 
	 */
	private static final long serialVersionUID = 121370836842222472L;

	private static final FlinkJobsClient<RestClusterClient<StandaloneClusterId>> client = new StandaloneRestClusterClient();

	@Override
	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		String uri = req.getRequestURI();
		if (uri.endsWith("/start")) {// 启动任务
			start(req, resp);
		} else if (uri.endsWith("/stop")) {// 停止任务
			stop(req, resp);
		} else {// 获取任务信息
			info(req, resp);
		}
	}

	private void start(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		try {
			JSONObject json = JSON.parseObject(HttpUtils.getRequestBody(req));
			FlinkJobs flinkJobs = XMLConfigLoader.getInstance().load(json.getString("flink-jobs"));// 解析XML配置
			PrintWriter pw = resp.getWriter();
			try {
				JobID jobId = client.submit(flinkJobs);
				HttpUtils.response(resp, Response.success(true).data(jobId == null ? null : jobId.toHexString()));
			} finally {
				pw.flush();
				pw.close();
			}
		} catch (Exception e) {
			e.printStackTrace();
			HttpUtils.response(resp, Response.success(false).message("请正确填写配置内容"));
		}
	}

	private void stop(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		JSONObject json = JSON.parseObject(HttpUtils.getRequestBody(req));
		try {
			String savepoint = client.stop(toJobID(json.getString("jobId")));
			HttpUtils.response(resp, Response.success(true).message("保存点：" + savepoint));
		} catch (Exception e) {
			HttpUtils.response(resp, Response.success(false).message(e.getMessage()));
		}
	}

	private void info(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		JSONObject json = JSON.parseObject(HttpUtils.getRequestBody(req));
		try {
			Info info = new Info();
			JobID jobId = toJobID(json.getString("jobId"));
			JobStatus jobStatus = client.getJobStatus(jobId);
			info.setState(jobStatus == null ? null : jobStatus.name());
			if (JobStatus.FAILED.equals(jobStatus)) {
				JobResult jobResult = client.requestJobResult(jobId);
				if (jobResult != null) {
					Optional<SerializedThrowable> throwable = jobResult.getSerializedThrowable();
					if (throwable != null) {
						SerializedThrowable e = jobResult.getSerializedThrowable().get();
						if (e != null) {
							info.setMessage(e.getMessage());
						}
					}
				}
			}
			HttpUtils.response(resp, Response.success(true).data(info));
		} catch (Exception e) {
			HttpUtils.response(resp, Response.success(false).message(e.getMessage()));
		}
	}

	private JobID toJobID(String jobId) {
		return JobID.fromHexString(jobId);
	}

	public static class Info {

		private String state;

		private String message;

		public String getState() {
			return state;
		}

		public void setState(String state) {
			this.state = state;
		}

		public String getMessage() {
			return message;
		}

		public void setMessage(String message) {
			this.message = message;
		}
	}
}
